package com.katesoft.scale4j.rttp.hibernate;

import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import net.jcip.annotations.ThreadSafe;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import com.hazelcast.core.AtomicNumber;
import com.hazelcast.core.HazelcastInstance;
import com.katesoft.scale4j.log.LogFactory;
import com.katesoft.scale4j.log.Logger;
import com.katesoft.scale4j.persistent.model.unified.AbstractPersistentEntity;
import com.katesoft.scale4j.rttp.client.ClusterIsNotActiveException;
import com.katesoft.scale4j.rttp.subscription.CloudSubscriber;
import com.katesoft.scale4j.rttp.subscription.ICloudSubscription;
import com.katesoft.scale4j.rttp.tools.HazelcastLauncher;

/**
 * This is helper class which allows to link spring and hazelcast and contains few useful methods.
 * 
 * @author kate2007.
 */
@ThreadSafe
public class SpringHazelcastBridge implements ApplicationContextAware {
   private final Logger logger = LogFactory.getLogger(getClass());
   private ApplicationContext applicationContext;
   private HazelcastLauncher hazelcastLauncher;
   private CloudSubscriber cloudSubscriber;

   @Override
   public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
      logger.debug("Application Context [%s] injected", applicationContext);
      this.applicationContext = applicationContext;
   }

   public HazelcastInstance getHazelcastInstance() {
      return hazelcastLauncher.getHazelcastInstance();
   }

   /**
    * return boolean wrapper - return class is immutable object and you would need to use
    * {@link SpringHazelcastBridge#updateAtomicBoolean(String, boolean)} to modify the state of
    * distributed boolean value.
    * 
    * @param name
    *           unique name for atomic boolean
    * @return immutable boolean wrapper
    */
   public AtomicBoolean getAtomicBoolean(String name) {
      AtomicNumber atomicNumber = getHazelcastInstance().getAtomicNumber(name);
      return new AtomicBoolean(atomicNumber.get() != 0);
   }

   public void updateAtomicBoolean(String name, boolean newValue) {
      AtomicNumber atomicNumber = getHazelcastInstance().getAtomicNumber(name);
      atomicNumber.set(newValue ? 1 : 0);
   }

   @SuppressWarnings({ "unchecked", "rawtypes" })
   public CloudSubscriber getCloudSubscriber() {
      if (cloudSubscriber == null) {
         final Map<String, CloudSubscriber> cloudSubscribers = applicationContext
                  .getBeansOfType(CloudSubscriber.class);
         if (cloudSubscribers.isEmpty()) {
            cloudSubscriber = new CloudSubscriber();
            final Collection<ICloudSubscription> values = applicationContext.getBeansOfType(
                     ICloudSubscription.class).values();
            cloudSubscriber
                     .setCloudSubscriptions(new LinkedHashSet<ICloudSubscription<? super AbstractPersistentEntity>>() {
                        private static final long serialVersionUID = 5382328889952385799L;

                        {
                           for (ICloudSubscription value : values) {
                              add(value);
                           }
                        }
                     });
            cloudSubscriber.setRttpHazelcastBridge(this);
         } else {
            Collection cloudSubscriptions = new LinkedHashSet();
            for (CloudSubscriber subscriber : cloudSubscribers.values()) {
               cloudSubscriptions.addAll(subscriber.getCloudSubscriptions());
            }
            cloudSubscriber = new CloudSubscriber();
            cloudSubscriber.setCloudSubscriptions(cloudSubscriptions);
            cloudSubscriber.setRttpHazelcastBridge(this);
         }
      }
      return cloudSubscriber;
   }

   /**
    * @return active hazelcast instance
    * @throws ClusterIsNotActiveException
    *            if hazelcast instance is not active
    */
   public HazelcastInstance getRunningInstance() throws ClusterIsNotActiveException {
      if (getHazelcastInstance().getLifecycleService().isRunning()) {
         return getHazelcastInstance();
      }
      throw new ClusterIsNotActiveException(getHazelcastInstance());
   }

   @Required
   public void setHazelcastLauncher(HazelcastLauncher hazelcastLauncher) {
      this.hazelcastLauncher = hazelcastLauncher;
   }

   public HazelcastLauncher getHazelcastLauncher() {
      return hazelcastLauncher;
   }
}
